from kafka import KafkaProducer
import json

# 配置 Kafka 服务器地址
bootstrap_servers = ['localhost:9092']
# 创建 Kafka 生产者实例
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 要发送的消息
message = {'key': 'value', 'message': 'Hello, Kafka!'}

# 要发送到的主题
topic = 'test_topic'

# 发送消息
try:
    future = producer.send(topic, value=message)
    # 等待消息发送结果
    record_metadata = future.get(timeout=10)
    print(f"消息发送成功，主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")
except Exception as e:
    print(f"消息发送失败: {e}")
finally:
    # 关闭生产者连接
    producer.close()
